跳到主要内容

Java Netty 使用 WebSocket

使用 Netty 搭配 WebSocket 编写一个基于浏览器的聊天应用程序

1、客户端发送一个消息; 2、该消息将被广播到所有其他连接的客户端

这个示例只写了服务端

配置环境

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.66.Final</version>
</dependency>

如何支持 WebSocket

在从标准的 HTTP 或者 HTTPS 协议切换到 WebSocket 时,将会使用一种称为升级握手的机制。因此,使用 WebSocket 的应用程序将始终以 HTTP/S 作为开始,然后再执行升级。

这个升级动作发生的确切时刻特定于应用程序:它可能会发生在启动时,也可能会发生在请求了某个特定的 URL 之后。

我们的应用程序将采用下面的约定:如果被请求的 URL 以 /ws 结尾,那么我们将会把该协议升级为 WebSocket 否则,服务器将使用基本的 HTTP/S。在连接已经升级完成之后,所有数据都将会使用 WebSocket 进行传输。

图 12-2 说明了该服务器逻辑,一如在 Netty 中一样,它由一组 ChannelHandler 实现。

WebSocket 属性

以下是 WebSocket 对象的属性。假定我们使用了以上代码创建了 Socket 对象:

WebSocket 事件

以下是 WebSocket 对象的相关事件。假定我们使用了以上代码创建了 Socket 对象:

事件事件处理程序描述
openSocket.onopen连接建立时触发
messageSocket.onmessage客户端接收服务端数据时触发
errorSocket.onerror通信发生错误时触发
closeSocket.onclose连接关闭时触发

WebSocket 控制帧

WebSocket 控制帧有 3 种: Close(关闭帧)、Ping 以及 Pong。

控制帧的操作码定义了 0x08(关闭帧)、0x09(Ping 帧)、0x0A(Pong 帧)。

Close 关闭帧很容易理解,客户端如果接受到了就关闭连接,客户端也可以发送关闭帧给服务端。Ping 和 Pong 是 websocket 里的心跳,用来保证客户端是在线的,一般来说只有服务端给客户端发送 Ping,然后客户端发送 Pong 来回应,表明自己仍然在线。

OPCODE:4位 , 解释 PayloadData,如果接收到未知的 opcode,接收端必须关闭连接。 0x0 表示附加数据帧 0x1 表示文本数据帧 0x2 表示二进制数据帧 0x3-7 暂时无定义,为以后的非控制帧保留 0x8 表示连接关闭 0x9 表示ping 0xA 表示pong 0xB-F 暂时无定义,为以后的控制帧保留

处理 HTTP 请求

首先,我们将实现该处理 HTTP 请求的组件。这个组件将 提供用于访问聊天室并显示由连接的客户端发送的消息的网页

其扩展了 SimpleChannel InboundHandler 以处理 FullHttpRequest 消息。需要注意的是,channelRead0() 方法的实现是如何转发任何目标 URI 为 /ws 的请求的。

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String wsUri;
private static final File INDEX;

static {
// 当前位置
URL location = HttpRequestHandler.class
.getProtectionDomain()
.getCodeSource()
.getLocation();

try {
String path = location.toURI() + "index.html";
path = !path.contains("file:") ? path : path.substring(5);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to locate index.html");
}
}

public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (wsUri.equalsIgnoreCase(request.uri())) { // 如果请求了 WebSocket 协议升级,则增加引用计数
// fireChannelRead 表示传递消息至下一个处理器
ctx.fireChannelRead(request.retain());
} else {
// 如果状态码显示是 100,处理 100 Continue 请求以符号 HTTP1.1 规则
if (HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx);
}
RandomAccessFile file = new RandomAccessFile(INDEX, "r"); // 读取 index.html
HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
response.headers().set(
HttpHeaderNames.CONTENT_TYPE,
"text/plain; charset=UTF-8"
);
boolean keepAlive = HttpUtil.isKeepAlive(request);
if (keepAlive) { // 如果请求了 keepAlive 则添加所需要的 HTTP 头信息
response.headers().set(
HttpHeaderNames.CONTENT_LENGTH,
file.length()
);
response.headers().set(
HttpHeaderNames.CONNECTION,
HttpHeaderValues.KEEP_ALIVE
);
}
ctx.write(response); // 将 HttpResponse 写到客户端
if (ctx.pipeline().get(SslHandler.class) == null) { // 将 index.html 写到客户端
// Netty 传输文件的时候没有使用 ByteBuf 进行向 Channel 中写入数据,而使用的 FileRegion。
// https://cloud.tencent.com/developer/article/1402669
// 这里使用 DefaultFileRegion 来利用零拷贝
ctx.write(new DefaultFileRegion(
file.getChannel(), 0, file.length()
));
} else {
// 分块上传
ctx.write(new ChunkedNioFile(file.getChannel()));
}
// LastHttpContent(空的 HttpContent),用来说明 body 的结束。
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (!keepAlive) { // 如果没有请求 keepAlive 则在写操作完成后关闭 Channel
future.addListener(ChannelFutureListener.CLOSE);
}
}
}

/**
* 处理 CONTINUE 请求
* 细节参考:理解 HTTP 协议中的 Expect: 100-continue
* https://blog.csdn.net/skh2015java/article/details/88723028
*/
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_0, HttpResponseStatus.CONTINUE
);
ctx.writeAndFlush(response);
}

/**
* 处理一下异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

如果该 HTTP 请求指向了地址为 /ws 的 URI, 那么 HttpRequestHandler 将调用 FullHttpRequest 对象上的 retain() 方法,并通过调用 fireChannelRead(msg) 方法将它转发给下一个 ChannelInboundHandler。

之所以需要调用 retain() 方法,是因为调用 channelRead() 方法完成之后,它将调用 FullHttpRequest 对象上的 release() 方法以释放它的资源,所以这里使用 retain() 避免消息被释放掉。

如果客户端发送了 HTTP 1.1 的 HTTP 头信息 Expect: 100-continue ,那么 HttpRequestHandler 将会发送一个 100 Continue 响应。在该 HTTP 头信息被设置之后,HttpRequestHandler 将会写回一个 HttpResponse 给客户端。这不是一个FullHttpResponse,因为它只是响应的第一个部分。

具体参考:理解 HTTP 协议中的 Expect: 100-continue

如果不需要加密和压缩,那么可以通过将 index.html 的内容存储到 defaultFileRegion 中来达到最佳效率。这将会利用零拷贝特性来进行内容的传输。为此,你可以检查一下,是否有 sslHandler 存在于在 ChannelPipeline 中。否则,你可以使用 ChunkedNioFile。

HttpRequestHandler 将 写一个 LastHttpContent 来标记响应的结束。 如果没有请求 keep-alive,那么 HttpRequestHandler 将会添加一个 ChannelFutureListener 到最后一次写出动作的 ChannelFuture,并关闭该连接。在这里,将调用 writeAndFlush() 方法以冲刷所有之前写入的消息。

这部分代码代表了聊天服务器的第一个部分, 它管理纯粹的 HTTP 请求和响应。接下来,我们将处理传输实际聊天消息的 WebSocket 帧。

WebSocket 帧:WebSocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧。

处理 WebSocket 帧

由 IETF 发布的 WebSocket RFC, 定义了 6 种帧,Netty 为它们每种都提供了一个 POJO 实现。表12-1列出了这些帧类型,并描述了它们的用法。

我们的聊天应用程序将使用下面几种帧类型:

  • CloseWebSocketFrame;
  • PingWebSocketFrame;
  • PongWebSocketFrame;
  • TextWebSocketFrame。

TextwebsocketFrame 是我们唯一真正需要处理的帧类型。为了符合WebSocket RFC,Netty提供了 WebSocketServerProtocolHandler 来处理其他类型的帧。

下面的代码展示了我们用于处理 TextwebSocketFrame 的 ChannelInboundHandler,其还将在它的 ChannelGroup 中跟踪所有活动的 WebSocket 连接。

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

private final ChannelGroup group;

public TextWebSocketFrameHandler(ChannelGroup group) {
this.group = group;
}

/**
* 重写 userEventTriggered 方法以处理自定义事件
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE 的方式过时了
// 现在推荐使用这种方式:(HandshakeComplete 和 HANDSHAKE_COMPLETE 一样都代表握手成功)
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
ctx.pipeline().remove(HttpRequestHandler.class);// 如果该事件握手成功,则从该 pipeline 移除这个 Handler
// 通知所有已经连接的 WebSocket,客户端新的客户端已经连接
group.writeAndFlush(new TextWebSocketFrame(
"Client " + ctx.channel() + " joined"
));
// 将新的 WebSocket Channel 添加到 ChannelGroup 里面
group.add(ctx.channel());
} else {
// 如果不是 WebSocket 握手成功事件,则将这个事件传递下去
super.userEventTriggered(ctx, evt);
}
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
group.writeAndFlush(msg.retain()); // 增加消息的引用计数,并将它写到 ChannelGroup 中所有已经连接的客户端中
}
}

TextwebSocketFrameHandler 只有一组非常少量的责任。当和新客户端的 WebSocket 握手成功完成之后,它将通过把通知消息写到 ChannelGroup 中的所有 Channel 来通知所有已经连接的客户端,然后它将把这个新 Channel 加入到该 ChannelGroup 中。

如果接收到了 TextWebSocketFrame 消息目,TextWebSocketFrameHandler 将调用 TextwebSocketFrame 消息上的 retain() 方法,并使用 writeAndFlush() 方法来将它传输给 ChannelGroup,以便所有已经连接的 WebSocket Channel 都将接收到它。

和之前一样,对于 retain() 方法的调用是必需的,因为当 channelRead0() 方法返回时,TextWebSocketFrame 的引用计数将会被减少。由于所有的操作都是异步的,因此,writeAndFlush() 方法可能会在 channelRead0() 方法返回之后完成,而且它绝对不能访问一个已经失效的引用。

因为 Netty 在内部处理了大部分剩下的功能,所以现在剩下唯一需要做的事情就是为每个新创建的 Channel 初始化其ChannelPipeline。为此,我们将需要一个 ChannelInitializer。

初始化 ChannelPipeline

对于 initChannel() 方法的调用,通过安装所有必需的 ChannelHandler 来设置该新注册的 Channel 的 ChannelPipeline。

public class ChatServerInitializer extends ChannelInitializer<Channel> {
private final ChannelGroup group;

public ChatServerInitializer(ChannelGroup group) {
this.group = group;
}

@Override
protected void initChannel(Channel ch) throws Exception {
// 将所有需要的 ChannelHandler 添加到管道里面
ChannelPipeline pipeline = ch.pipeline();
// 默认的 HTTP 解码\编码器(解析 HTTP 协议),
// 它会将字节码解码为 HttpRequest、HttpContent 和 LastHttpContent。
// 并将 HttpRequest、HttpContent 和 LastHttpContent 编码为字节码
pipeline.addLast(new HttpServerCodec());
// 添加分块处理器(写入一个文件时用到它)
pipeline.addLast(new ChunkedWriteHandler());
// 这个 HttpObjectAggregator 使用参考 https://www.cnblogs.com/bihanghang/p/10218738.html
// 将一个 HttpMessage 和跟随它的多个 HttpContent 聚合为单个 FullHttpRequest 或者 FullHttpResponse(取决于它是响应还是请求)
// 安装了这个后就能确保 ChannelPipeline 中下一个 ChannelHandler 收到完整的 HTTP 请求或响应
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
// 添加上自定义的处理器,它处理 FullHttpRequest(那些不发送到 /ws URI 的请求)
pipeline.addLast(new HttpRequestHandler("/ws"));
// Netty 的 ws 协议处理器(会自动处理各种事件与其它帧),它按照 WebSocket 规范的要求,
// 处理 WebSocket 升级握手、PingWebSocketFrame、PongWebSocketFrame 和 CloseWebSocketFrom
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 添加自定义的帧管理器,处理 TextWebSocketFrame 和 握手完成事件
pipeline.addLast(new TextWebSocketFrameHandler(group));
}
}

Netty 的 webSocketServerProtocolHandler 处理了所有委托管理的 WebSocket 帧类型以及升级握手本身。如果握手成功,那么所需的 ChannelHandler 将会被添加到 ChannelPipeline 中,而那些不再需要的 ChannelHandler 则将会被移除。

WebSocket 协议升级之前的 ChannelPipeline 的状态如图所示。这代表了刚刚被 ChatServerInitializer 初始化之后的 ChannelPipeline。

当 WebSocket 协议升级完成之后,WebSocketServerProtocolHandler 将会把 HttpRequestDecoder 替换为webSocketFrameDecoder,把 HttpResponseEncoder 替换为 WebSocketFrameEncoder。

为了性能最大化,它将移除任何不再被 WebSocket 连接所需要的 ChannelHandler。这也包括了图12-3所示的 HttpobjectAggregator 和 HttpRequestHandler

图 12-4 展示了这些操作完成之后的 ChannelPipeline。需要注意的是,Netty 目前支持 4 个版本的 WebSocket 协议,它们每个都具有自己的实现类。Netty 将会根据客户端(这里指浏览器)所支持的版本自动地选择正确版本的 WebSocketFrameDecoder 和 webSocketFrameEncoder。

创建服务端

最后创建一个启动器,把上面的内容都组织起来

public class ChatServer {
// 创建一个 ChannelGroup,并指定它的事件处理器
private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
private final EventLoopGroup group = new NioEventLoopGroup();
private Channel channel;

/**
*
* @param address 地址
*/
public ChannelFuture start(InetSocketAddress address) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(createInitializer(channelGroup));

ChannelFuture future = bootstrap.bind(address);// 绑定地址
// 等待这个 future 完成
future.syncUninterruptibly();
channel = future.channel();
return future;
}

// 创建自定义的 Channel 初始器
private ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
return new ChatServerInitializer(group);
}

/**
* 处理服务器关闭,并释放所有资源
*/
private void destroy() {
if (channel != null) {
channel.close();
}

channelGroup.close();
group.shutdownGracefully();
}

public static void main(String[] args) {
int port = 9999;
final ChatServer endpoint = new ChatServer();
ChannelFuture future = endpoint.start(new InetSocketAddress(port));
// 注册一个虚拟机关键钩子,这里关闭这个进程会自动释放全部资源
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
endpoint.destroy();
}
});

future.channel().closeFuture().syncUninterruptibly();
}
}

前端测试

<!DOCTYPE html>
<html lang="en">

<head>
<meta charset="UTF-8">
<title>Test WebSocket by Netty</title>
</head>

<body>
<script type="text/javascript">
let socket;
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:9999/ws")
socket.onmessage = (ev) => {
let ta = document.getElementById("responseText");
ta.value = ta.value + "\n" + ev.data;
}

socket.onopen = (ev) => {
let ta = document.getElementById("responseText");
ta.value = "连接开启";
}

socket.onclose = (ev) => {
let ta = document.getElementById("responseText");
ta.value = ta.value + "\n连接关闭";
}
}
else {
alert("当前浏览器不支持 WebSocket ")
}

function send(message) {
if (!window.WebSocket) {
return;
} else {
if (socket.readyState = WebSocket.OPEN) {
socket.send(message);
}
else {
alert("连接尚未开启");
}
}
}
</script>
<form>
<textarea name="message" style="width: 400px;height: 200px">

</textarea>

<input type="button" value="发送数据" onclick="send(this.form.message.value)">

<h3>服务端输出:</h3>
<!-- 显示的内容在这里 -->
<textarea id="responseText" style="width: 400px;height: 200px"></textarea>
</form>
</body>

</html>

使用测试

启动服务器和客户端后: